import Logs.MetaData
import Logs.EquivilantKeys
import Logs.Location
+import Messages.Progress
import Utility.Metered
import Utility.TimeStamp
import Utility.Env
import Network.HTTP.Types.URI
import Data.Time.Clock
import Text.Read
+import Control.Concurrent.STM
+import Control.Concurrent.Async
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString as B
deriving (Show, Eq)
instance Proto.Serializable PercentFloat where
- serialize (PercentFloat p) = show p
- deserialize s = PercentFloat <$> readMaybe s
+ serialize (PercentFloat p) = show p ++ "%"
+ deserialize s = do
+ s' <- reverse <$> stripPrefix "%" (reverse s)
+ PercentFloat <$> readMaybe s'
data ComputeState = ComputeState
{ computeParams :: [String]
-> (OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath)))
-- ^ get input file's content, or Nothing the input file's
-- content is not available
+ -> Maybe (Key, MeterUpdate)
+ -- ^ update meter for this key
-> (ComputeState -> OsPath -> NominalDiffTime -> Annex v)
-> Annex v
-runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont =
+runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont =
withOtherTmp $ \othertmpdir ->
withTmpDirIn othertmpdir (literalOsPath "compute") go
where
}
showOutput
starttime <- liftIO currentMonotonicTimestamp
- state' <- bracket
+ state' <- withmeterfile $ \meterfile -> bracket
(liftIO $ createProcess pr)
(liftIO . cleanupProcess)
- (getinput state tmpdir subdir)
+ (getinput tmpdir subdir state meterfile)
endtime <- liftIO currentMonotonicTimestamp
cont state' subdir (calcduration starttime endtime)
, return tmpdir
)
- getinput state' tmpdir subdir p =
+ getinput tmpdir subdir state' meterfile p =
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
Just l
- | null l -> getinput state' tmpdir subdir p
+ | null l -> getinput tmpdir subdir state' meterfile p
| otherwise -> do
- state'' <- parseoutput p tmpdir subdir state' l
- getinput state'' tmpdir subdir p
+ state'' <- parseoutput p tmpdir subdir state' meterfile l
+ getinput tmpdir subdir state'' meterfile p
Nothing -> do
liftIO $ hClose (stdoutHandle p)
liftIO $ hClose (stdinHandle p)
giveup $ program ++ " exited unsuccessfully"
return state'
- parseoutput p tmpdir subdir state' l = case Proto.parseMessage l of
+ parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of
Just (ProcessInput f) -> do
let f' = toOsPath f
let knowninput = M.member f' (computeInputs state')
Just (ProcessOutput f) -> do
let f' = toOsPath f
checksafefile tmpdir subdir f' "output"
- let knownoutput = M.member f' (computeOutputs state')
+ knownoutput <- case M.lookup f' (computeOutputs state') of
+ Nothing -> return False
+ Just mk -> do
+ when (mk == fmap fst meterkey) $
+ meterfile (subdir </> f')
+ return True
checkimmutable knownoutput "outputting" f' $
return $ if immutablestate
then state'
(computeOutputs state')
}
Just (ProcessProgress percent) -> do
- -- XXX
+ liftIO $ updatepercent percent
return state'
Just ProcessReproducible ->
return $ state' { computeReproducible = True }
Nothing -> giveup $
- program ++ " output included an unparseable line: \"" ++ l ++ "\""
+ program ++ " output an unparseable line: \"" ++ l ++ "\""
checksafefile tmpdir subdir f fileaction = do
let err problem = giveup $
liftIO . F.writeFile f =<< catObject gitsha
return f
+ withmeterfile a = case meterkey of
+ Nothing -> a (const noop)
+ Just (_, progress) -> do
+ filev <- liftIO newEmptyTMVarIO
+ endv <- liftIO $ newEmptyTMVarIO
+ let meterfile = void . liftIO
+ . atomically . tryPutTMVar filev
+ let endmeterfile = atomically $ putTMVar endv ()
+ tid <- liftIO $ async $ do
+ v <- liftIO $ atomically $
+ (Right <$> takeTMVar filev)
+ `orElse`
+ (Left <$> takeTMVar endv)
+ case v of
+ Right f -> watchFileSize f progress $ \_ ->
+ void $ liftIO $ atomically $
+ takeTMVar endv
+ Left () -> return ()
+ a meterfile
+ `finally` liftIO (endmeterfile >> wait tid)
+
+ updatepercent (PercentFloat percent) = case meterkey of
+ Nothing -> noop
+ Just (k, progress) -> case fromKey keySize k of
+ Nothing -> noop
+ Just sz ->
+ progress $ BytesProcessed $ floor $
+ fromIntegral sz * percent / 100
+
computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
computationBehaviorChangeError (ComputeProgram program) requestdesc p =
giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p
computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
-computeKey rs (ComputeProgram program) k _af dest p vc =
+computeKey rs (ComputeProgram program) k _af dest meterupdate vc =
getComputeState rs k >>= \case
- Just state ->
+ Just state ->
case computeskey state of
- Just keyfile -> runComputeProgram
- (ComputeProgram program)
- state
- (ImmutableState True)
- (getinputcontent state)
- (postcompute keyfile)
+ Just keyfile -> go state keyfile
Nothing -> missingstate
Nothing -> missingstate
where
missingstate = giveup "Missing compute state"
+ go state keyfile = metered (Just meterupdate) k Nothing $ \_ p ->
+ runComputeProgram (ComputeProgram program) state
+ (ImmutableState True)
+ (getinputcontent state)
+ (Just (k, p))
+ (postcompute keyfile)
+
getinputcontent state f =
case M.lookup f (computeInputs state) of
Just inputkey -> case keyGitSha inputkey of